[[testing]]
= Testing Applications

The `spring-kafka-test` jar contains some useful utilities to assist with testing your applications.

[[ekb]]
== Embedded Kafka Broker

Two implementations are provided:

* `EmbeddedKafkaZKBroker` - legacy implementation which starts an embedded `Zookeeper` instance (which is still the default when using `EmbeddedKafka`).
* `EmbeddedKafkaKraftBroker` - uses `Kraft` instead of `Zookeeper` in combined controller and broker modes (since 3.1).

There are several techniques to configure the broker as discussed in the following sections.

[[ktu]]
== KafkaTestUtils

`org.springframework.kafka.test.utils.KafkaTestUtils` provides a number of static helper methods to consume records, retrieve various record offsets, and others.
Refer to its https://docs.spring.io/spring-kafka/docs/current/api/org/springframework/kafka/test/utils/KafkaTestUtils.html[Javadocs] for complete details.

[[junit]]
== JUnit

`org.springframework.kafka.test.utils.KafkaTestUtils` also provides some static methods to set up producer and consumer properties.
The following listing shows those method signatures:

[source, java]
----
/**
 * Set up test properties for an {@code <Integer, String>} consumer.
 * @param group the group id.
 * @param autoCommit the auto commit.
 * @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
 * @return the properties.
 */
public static Map<String, Object> consumerProps(String group, String autoCommit,
                                       EmbeddedKafkaBroker embeddedKafka) { ... }

/**
 * Set up test properties for an {@code <Integer, String>} producer.
 * @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
 * @return the properties.
 */
public static Map<String, Object> producerProps(EmbeddedKafkaBroker embeddedKafka) { ... }
----

[NOTE]
====
Starting with version 2.5, the `consumerProps` method sets the `ConsumerConfig.AUTO_OFFSET_RESET_CONFIG` to `earliest`.
This is because, in most cases, you want the consumer to consume any messages sent in a test case.
The `ConsumerConfig` default is `latest` which means that messages already sent by a test, before the consumer starts, will not receive those records.
To revert to the previous behavior, set the property to `latest` after calling the method.

When using the embedded broker, it is generally best practice using a different topic for each test, to prevent cross-talk.
If this is not possible for some reason, note that the `consumeFromEmbeddedTopics` method's default behavior is to seek the assigned partitions to the beginning after assignment.
Since it does not have access to the consumer properties, you must use the overloaded method that takes a `seekToEnd` boolean parameter to seek to the end instead of the beginning.
====

A JUnit 4 `@Rule` wrapper for the `EmbeddedKafkaZKBroker` is provided to create an embedded Kafka and an embedded Zookeeper server.
(See xref:testing.adoc#embedded-kafka-annotation[@EmbeddedKafka Annotation] for information about using `@EmbeddedKafka` with JUnit 5).
The following listing shows the signatures of those methods:

[source, java]
----
/**
 * Create embedded Kafka brokers.
 * @param count the number of brokers.
 * @param controlledShutdown passed into TestUtils.createBrokerConfig.
 * @param topics the topics to create (2 partitions per).
 */
public EmbeddedKafkaRule(int count, boolean controlledShutdown, String... topics) { ... }

/**
 *
 * Create embedded Kafka brokers.
 * @param count the number of brokers.
 * @param controlledShutdown passed into TestUtils.createBrokerConfig.
 * @param partitions partitions per topic.
 * @param topics the topics to create.
 */
public EmbeddedKafkaRule(int count, boolean controlledShutdown, int partitions, String... topics) { ... }
----

NOTE: The `EmbeddedKafkaKraftBroker` is not supported with JUnit4.

The `EmbeddedKafkaBroker` class has a utility method that lets you consume for all the topics it created.
The following example shows how to use it:

[source, java]
----
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<Integer, String> consumer = cf.createConsumer();
embeddedKafka.consumeFromAllEmbeddedTopics(consumer);
----

The `KafkaTestUtils` has some utility methods to fetch results from the consumer.
The following listing shows those method signatures:

[source, java]
----
/**
 * Poll the consumer, expecting a single record for the specified topic.
 * @param consumer the consumer.
 * @param topic the topic.
 * @return the record.
 * @throws org.junit.ComparisonFailure if exactly one record is not received.
 */
public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic) { ... }

/**
 * Poll the consumer for records.
 * @param consumer the consumer.
 * @return the records.
 */
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer) { ... }
----

The following example shows how to use `KafkaTestUtils`:

[source, java]
----
...
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = KafkaTestUtils.getSingleRecord(consumer, "topic");
...
----

When the embedded Kafka and embedded Zookeeper server are started by the `EmbeddedKafkaBroker`, a system property named `spring.embedded.kafka.brokers` is set to the address of the Kafka brokers and a system property named `spring.embedded.zookeeper.connect` is set to the address of Zookeeper.
Convenient constants (`EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS` and `EmbeddedKafkaBroker.SPRING_EMBEDDED_ZOOKEEPER_CONNECT`) are provided for this property.

Instead of default `spring.embedded.kafka.brokers` system property, the address of the Kafka brokers can be exposed to any arbitrary and convenient property.
For this purpose a `spring.embedded.kafka.brokers.property` (`EmbeddedKafkaBroker.BROKER_LIST_PROPERTY`) system property can be set before starting an embedded Kafka.
For example, with Spring Boot a `spring.kafka.bootstrap-servers` configuration property is expected to be set for auto-configuring Kafka client, respectively.
So, before running tests with an embedded Kafka on random ports, we can set `spring.embedded.kafka.brokers.property=spring.kafka.bootstrap-servers` as a system property - and the `EmbeddedKafkaBroker`  will use it to expose its broker addresses.
This is now the default value for this property (starting with version 3.0.10).

With the `EmbeddedKafkaBroker.brokerProperties(Map<String, String>)`, you can provide additional properties for the Kafka servers.
See https://kafka.apache.org/documentation/#brokerconfigs[Kafka Config] for more information about possible broker properties.

[[configuring-topics]]
== Configuring Topics

The following example configuration creates topics called `cat` and `hat` with five partitions, a topic called `thing1` with 10 partitions, and a topic called `thing2` with 15 partitions:

[source, java]
----
public class MyTests {

    @ClassRule
    private static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, false, 5, "cat", "hat");

    @Test
    public void test() {
        embeddedKafkaRule.getEmbeddedKafka()
              .addTopics(new NewTopic("thing1", 10, (short) 1), new NewTopic("thing2", 15, (short) 1));
        ...
    }

}
----

By default, `addTopics` will throw an exception when problems arise (such as adding a topic that already exists).
Version 2.6 added a new version of that method that returns a `Map<String, Exception>`; the key is the topic name and the value is `null` for success, or an `Exception` for a failure.

[[same-broker-multiple-tests]]
== Using the Same Broker(s) for Multiple Test Classes

You can use the same broker for multiple test classes with something similar to the following:

[source, java]
----
public final class EmbeddedKafkaHolder {

    private static EmbeddedKafkaBroker embeddedKafka = new EmbeddedKafkaZKBroker(1, false)
            .brokerListProperty("spring.kafka.bootstrap-servers");

    private static boolean started;

    public static EmbeddedKafkaBroker getEmbeddedKafka() {
        if (!started) {
            try {
                embeddedKafka.afterPropertiesSet();
            }
            catch (Exception e) {
                throw new KafkaException("Embedded broker failed to start", e);
            }
            started = true;
        }
        return embeddedKafka;
    }

    private EmbeddedKafkaHolder() {
        super();
    }

}
----

This assumes a Spring Boot environment and the embedded broker replaces the bootstrap servers property.

Then, in each test class, you can use something similar to the following:

[source, java]
----
static {
    EmbeddedKafkaHolder.getEmbeddedKafka().addTopics("topic1", "topic2");
}

private static final EmbeddedKafkaBroker broker = EmbeddedKafkaHolder.getEmbeddedKafka();
----

If you are not using Spring Boot, you can obtain the bootstrap servers using `broker.getBrokersAsString()`.

IMPORTANT: The preceding example provides no mechanism for shutting down the broker(s) when all tests are complete.
This could be a problem if, say, you run your tests in a Gradle daemon.
You should not use this technique in such a situation, or you should use something to call `destroy()` on the `EmbeddedKafkaBroker` when your tests are complete.

Starting with version 3.0, the framework exposes a `GlobalEmbeddedKafkaTestExecutionListener` for the JUnit Platform; it is disabled by default.
This requires JUnit Platform 1.8 or greater.
The purpose of this listener is to start one global `EmbeddedKafkaBroker` for the whole test plan and stop it at the end of the plan.
To enable this listener, and therefore have a single global embedded Kafka cluster for all the tests in the project, the `spring.kafka.global.embedded.enabled` property must be set to `true` via system properties or JUnit Platform configuration.
In addition, these properties can be provided:

- `spring.kafka.embedded.count` - the number of Kafka brokers to manage;
- `spring.kafka.embedded.ports` - ports (comma-separated value) for every Kafka broker to start, `0` if random port is preferred; the number of values must be equal to the `count` mentioned above;
- `spring.kafka.embedded.topics` - topics (comma-separated value) to create in the started Kafka cluster;
- `spring.kafka.embedded.partitions` - number of partitions to provision for the created topics;
- `spring.kafka.embedded.broker.properties.location` - the location of the file for additional Kafka broker configuration properties; the value of this property must follow the Spring resource abstraction pattern;
- `spring.kafka.embedded.kraft` - default false, when true, use an `EmbeddedKafkaKraftBroker` instead of an `EmbeddedKafkaZKBroker`.

Essentially these properties mimic some of the `@EmbeddedKafka` attributes.

See more information about configuration properties and how to provide them in the https://junit.org/junit5/docs/current/user-guide/#running-tests-config-params[JUnit 5 User Guide].
For example, a `spring.embedded.kafka.brokers.property=my.bootstrap-servers` entry can be added into a `junit-platform.properties` file in the testing classpath.
Starting with version 3.0.10, the broker automatically sets this to `spring.kafka.bootstrap-servers`, by default, for testing with Spring Boot applications.

NOTE: It is recommended to not combine a global embedded Kafka and per-test class in a single test suite.
Both of them share the same system properties, so it is very likely going to lead to unexpected behavior.

NOTE: `spring-kafka-test` has transitive dependencies on `junit-jupiter-api` and `junit-platform-launcher` (the latter to support the global embedded broker).
If you wish to use the embedded broker and are NOT using JUnit, you may wish to exclude these dependencies.

[[embedded-kafka-annotation]]
== `@EmbeddedKafka` Annotation
We generally recommend that you use the rule as a `@ClassRule` to avoid starting and stopping the broker between tests (and use a different topic for each test).
Starting with version 2.0, if you use Spring's test application context caching, you can also declare a `EmbeddedKafkaBroker` bean, so a single broker can be used across multiple test classes.
For convenience, we provide a test class-level annotation called `@EmbeddedKafka` to register the `EmbeddedKafkaBroker` bean.
The following example shows how to use it:

[source, java]
----
@RunWith(SpringRunner.class)
@DirtiesContext
@EmbeddedKafka(partitions = 1,
         topics = {
                 KafkaStreamsTests.STREAMING_TOPIC1,
                 KafkaStreamsTests.STREAMING_TOPIC2 })
public class KafkaStreamsTests {

    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;

    @Test
    public void someTest() {
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", this.embeddedKafka);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
        Consumer<Integer, String> consumer = cf.createConsumer();
        this.embeddedKafka.consumeFromAnEmbeddedTopic(consumer, KafkaStreamsTests.STREAMING_TOPIC2);
        ConsumerRecords<Integer, String> replies = KafkaTestUtils.getRecords(consumer);
        assertThat(replies.count()).isGreaterThanOrEqualTo(1);
    }

    @Configuration
    @EnableKafkaStreams
    public static class KafkaStreamsConfiguration {

        @Value("${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
        private String brokerAddresses;

        @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
        public KafkaStreamsConfiguration kStreamsConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);
            return new KafkaStreamsConfiguration(props);
        }

    }

}
----

Starting with version 2.2.4, you can also use the `@EmbeddedKafka` annotation to specify the Kafka ports property.

Starting with version 3.2, set the `kraft` property to `true` to use an `EmbeddedKafkaKraftBroker` instead of an `EmbeddedKafkaZKBroker`.

The following example sets the `topics`, `brokerProperties`, and `brokerPropertiesLocation` attributes of `@EmbeddedKafka` support property placeholder resolutions:

[source, java]
----
@TestPropertySource(locations = "classpath:/test.properties")
@EmbeddedKafka(topics = { "any-topic", "${kafka.topics.another-topic}" },
        brokerProperties = { "log.dir=${kafka.broker.logs-dir}",
                            "listeners=PLAINTEXT://localhost:${kafka.broker.port}",
                            "auto.create.topics.enable=${kafka.broker.topics-enable:true}" },
        brokerPropertiesLocation = "classpath:/broker.properties")
----

In the preceding example, the property placeholders `${kafka.topics.another-topic}`, `${kafka.broker.logs-dir}`, and `${kafka.broker.port}` are resolved from the Spring `Environment`.
In addition, the broker properties are loaded from the `broker.properties` classpath resource specified by the `brokerPropertiesLocation`.
Property placeholders are resolved for the `brokerPropertiesLocation` URL and for any property placeholders found in the resource.
Properties defined by `brokerProperties` override properties found in `brokerPropertiesLocation`.

You can use the `@EmbeddedKafka` annotation with JUnit 4 or JUnit 5.

[[embedded-kafka-junit5]]
== `@EmbeddedKafka` Annotation with JUnit5

Starting with version 2.3, there are two ways to use the `@EmbeddedKafka` annotation with JUnit5.
When used with the `@SpringJunitConfig` annotation, the embedded broker is added to the test application context.
You can auto wire the broker into your test, at the class or method level, to get the broker address list.

When *not* using the spring test context, the `EmbdeddedKafkaCondition` creates a broker; the condition includes a parameter resolver so you can access the broker in your test method.

[source, java]
----
@EmbeddedKafka
public class EmbeddedKafkaConditionTests {

    @Test
    public void test(EmbeddedKafkaBroker broker) {
        String brokerList = broker.getBrokersAsString();
        ...
    }

}
----

A standalone broker (outside the Spring's TestContext) will be created unless a class annotated `@EmbeddedKafka` is also annotated (or meta-annotated) with `ExtendWith(SpringExtension.class)`.
`@SpringJunitConfig` and `@SpringBootTest` are so meta-annotated and the context-based broker will be used when either of those annotations are also present.

IMPORTANT: When there is a Spring test application context available, the topics and broker properties can contain property placeholders, which will be resolved as long as the property is defined somewhere.
If there is no Spring context available, these placeholders won't be resolved.

[[embedded-broker-in-springboottest-annotations]]
== Embedded Broker in `@SpringBootTest` Annotations

https://start.spring.io/[Spring Initializr] now automatically adds the `spring-kafka-test` dependency in test scope to the project configuration.

[IMPORTANT]
====
If your application uses the Kafka binder in `spring-cloud-stream` and if you want to use an embedded broker for tests, you must remove the `spring-cloud-stream-test-support` dependency, because it replaces the real binder with a test binder for test cases.
If you wish some tests to use the test binder and some to use the embedded broker, tests that use the real binder need to disable the test binder by excluding the binder auto configuration in the test class.
The following example shows how to do so:

=====
[source, java]
----
@RunWith(SpringRunner.class)
@SpringBootTest(properties = "spring.autoconfigure.exclude="
    + "org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration")
public class MyApplicationTests {
    ...
}
----
=====
====

There are several ways to use an embedded broker in a Spring Boot application test.

They include:

* xref:testing.adoc#kafka-testing-junit4-class-rule[JUnit4 Class Rule]
* xref:testing.adoc#kafka-testing-embeddedkafka-annotation[`@EmbeddedKafka` Annotation or `EmbeddedKafkaBroker` Bean]

[[kafka-testing-junit4-class-rule]]
=== JUnit4 Class Rule

The following example shows how to use a JUnit4 class rule to create an embedded broker:

[source, java]
----
@RunWith(SpringRunner.class)
@SpringBootTest
public class MyApplicationTests {

    @ClassRule
    public static EmbeddedKafkaRule broker = new EmbeddedKafkaRule(1, false, "someTopic")
            .brokerListProperty("spring.kafka.bootstrap-servers");

    @Autowired
    private KafkaTemplate<String, String> template;

    @Test
    public void test() {
        ...
    }

}
----

Notice that, since this is a Spring Boot application, we override the broker list property to set Spring Boot's property.

[[embedded-broker-with-springjunitconfig-annotations]]
== `@EmbeddedKafka` with `@SpringJunitConfig`

When using `@EmbeddedKafka` with `@SpringJUnitConfig`, it is recommended to use `@DirtiesContext` on the test class.
This is to prevent potential race conditions occurring during the JVM shutdown after running multiple tests in a test suite.
For example, without using `@DirtiesContext`, the `EmbeddedKafkaBroker` may shutdown earlier while the application context still needs resources from it.
Since every `EmbeddedKafka` test-runs create its own temporary directory, when this race condition occurs, it will produce error log messages indicating that the files that it is trying to delete or cleanup are not available anymore.
Adding `@DirtiesContext` will ensure that the application context is cleaned up after each test and not cached, making it less vulnerable to potential resource race conditions like these.


[[kafka-testing-embeddedkafka-annotation]]
=== `@EmbeddedKafka` Annotation or `EmbeddedKafkaBroker` Bean

The following example shows how to use an `@EmbeddedKafka` Annotation to create an embedded broker:

[source, java]
----
@RunWith(SpringRunner.class)
@EmbeddedKafka(topics = "someTopic",
        bootstrapServersProperty = "spring.kafka.bootstrap-servers") // this is now the default
public class MyApplicationTests {

    @Autowired
    private KafkaTemplate<String, String> template;

    @Test
    public void test() {
        ...
    }

}
----

NOTE: The `bootstrapServersProperty` is automatically set to `spring.kafka.bootstrap-servers` by default, starting with version 3.0.10.

[[hamcrest-matchers]]
== Hamcrest Matchers

The `org.springframework.kafka.test.hamcrest.KafkaMatchers` provides the following matchers:

[source, java]
----
/**
 * @param key the key
 * @param <K> the type.
 * @return a Matcher that matches the key in a consumer record.
 */
public static <K> Matcher<ConsumerRecord<K, ?>> hasKey(K key) { ... }

/**
 * @param value the value.
 * @param <V> the type.
 * @return a Matcher that matches the value in a consumer record.
 */
public static <V> Matcher<ConsumerRecord<?, V>> hasValue(V value) { ... }

/**
 * @param partition the partition.
 * @return a Matcher that matches the partition in a consumer record.
 */
public static Matcher<ConsumerRecord<?, ?>> hasPartition(int partition) { ... }

/**
 * Matcher testing the timestamp of a {@link ConsumerRecord} assuming the topic has been set with
 * {@link org.apache.kafka.common.record.TimestampType#CREATE_TIME CreateTime}.
 *
 * @param ts timestamp of the consumer record.
 * @return a Matcher that matches the timestamp in a consumer record.
 */
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(long ts) {
  return hasTimestamp(TimestampType.CREATE_TIME, ts);
}

/**
 * Matcher testing the timestamp of a {@link ConsumerRecord}
 * @param type timestamp type of the record
 * @param ts timestamp of the consumer record.
 * @return a Matcher that matches the timestamp in a consumer record.
 */
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(TimestampType type, long ts) {
  return new ConsumerRecordTimestampMatcher(type, ts);
}
----

[[assertj-conditions]]
== AssertJ Conditions

You can use the following AssertJ conditions:

[source, java]
----
/**
 * @param key the key
 * @param <K> the type.
 * @return a Condition that matches the key in a consumer record.
 */
public static <K> Condition<ConsumerRecord<K, ?>> key(K key) { ... }

/**
 * @param value the value.
 * @param <V> the type.
 * @return a Condition that matches the value in a consumer record.
 */
public static <V> Condition<ConsumerRecord<?, V>> value(V value) { ... }

/**
 * @param key the key.
 * @param value the value.
 * @param <K> the key type.
 * @param <V> the value type.
 * @return a Condition that matches the key in a consumer record.
 * @since 2.2.12
 */
public static <K, V> Condition<ConsumerRecord<K, V>> keyValue(K key, V value) { ... }

/**
 * @param partition the partition.
 * @return a Condition that matches the partition in a consumer record.
 */
public static Condition<ConsumerRecord<?, ?>> partition(int partition) { ... }

/**
 * @param value the timestamp.
 * @return a Condition that matches the timestamp value in a consumer record.
 */
public static Condition<ConsumerRecord<?, ?>> timestamp(long value) {
  return new ConsumerRecordTimestampCondition(TimestampType.CREATE_TIME, value);
}

/**
 * @param type the type of timestamp
 * @param value the timestamp.
 * @return a Condition that matches the timestamp value in a consumer record.
 */
public static Condition<ConsumerRecord<?, ?>> timestamp(TimestampType type, long value) {
  return new ConsumerRecordTimestampCondition(type, value);
}
----

[[example]]
== Example

The following example brings together most of the topics covered in this chapter:

[source, java]
----
public class KafkaTemplateTests {

    private static final String TEMPLATE_TOPIC = "templateTopic";

    @ClassRule
    public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TEMPLATE_TOPIC);

    @Test
    public void testTemplate() throws Exception {
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false",
            embeddedKafka.getEmbeddedKafka());
        DefaultKafkaConsumerFactory<Integer, String> cf =
                            new DefaultKafkaConsumerFactory<>(consumerProps);
        ContainerProperties containerProperties = new ContainerProperties(TEMPLATE_TOPIC);
        KafkaMessageListenerContainer<Integer, String> container =
                            new KafkaMessageListenerContainer<>(cf, containerProperties);
        final BlockingQueue<ConsumerRecord<Integer, String>> records = new LinkedBlockingQueue<>();
        container.setupMessageListener(new MessageListener<Integer, String>() {

            @Override
            public void onMessage(ConsumerRecord<Integer, String> record) {
                System.out.println(record);
                records.add(record);
            }

        });
        container.setBeanName("templateTests");
        container.start();
        ContainerTestUtils.waitForAssignment(container,
                            embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic());
        Map<String, Object> producerProps =
                            KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
        ProducerFactory<Integer, String> pf =
                            new DefaultKafkaProducerFactory<>(producerProps);
        KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
        template.setDefaultTopic(TEMPLATE_TOPIC);
        template.sendDefault("foo");
        assertThat(records.poll(10, TimeUnit.SECONDS), hasValue("foo"));
        template.sendDefault(0, 2, "bar");
        ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
        assertThat(received, hasKey(2));
        assertThat(received, hasPartition(0));
        assertThat(received, hasValue("bar"));
        template.send(TEMPLATE_TOPIC, 0, 2, "baz");
        received = records.poll(10, TimeUnit.SECONDS);
        assertThat(received, hasKey(2));
        assertThat(received, hasPartition(0));
        assertThat(received, hasValue("baz"));
    }

}
----

The preceding example uses the Hamcrest matchers.
With `AssertJ`, the final part looks like the following code:

[source, java]
----
assertThat(records.poll(10, TimeUnit.SECONDS)).has(value("foo"));
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
// using individual assertions
assertThat(received).has(key(2));
assertThat(received).has(value("bar"));
assertThat(received).has(partition(0));
template.send(TEMPLATE_TOPIC, 0, 2, "baz");
received = records.poll(10, TimeUnit.SECONDS);
// using allOf()
assertThat(received).has(allOf(keyValue(2, "baz"), partition(0)));
----

[[mock-cons-prod]]
== Mock Consumer and Producer

The `kafka-clients` library provides `MockConsumer` and `MockProducer` classes for testing purposes.

If you wish to use these classes in some of your tests with listener containers or `KafkaTemplate` respectively, starting with version 3.0.7, the framework now provides `MockConsumerFactory` and `MockProducerFactory` implementations.

These factories can be used in the listener container and template instead of the default factories, which require a running (or embedded) broker.

Here is an example of a simple implementation returning a single consumer:

[source, java]
----
@Bean
ConsumerFactory<String, String> consumerFactory() {
    MockConsumer<String, String> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
    TopicPartition topicPartition0 = new TopicPartition("topic", 0);
    List<TopicPartition> topicPartitions = Collections.singletonList(topicPartition0);
    Map<TopicPartition, Long> beginningOffsets = topicPartitions.stream().collect(Collectors
            .toMap(Function.identity(), tp -> 0L));
    consumer.updateBeginningOffsets(beginningOffsets);
    consumer.schedulePollTask(() -> {
        consumer.addRecord(
                new ConsumerRecord<>("topic", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "test1",
                        new RecordHeaders(), Optional.empty()));
        consumer.addRecord(
                new ConsumerRecord<>("topic", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "test2",
                        new RecordHeaders(), Optional.empty()));
    });
    return new MockConsumerFactory(() -> consumer);
}
----

If you wish to test with concurrency, the `Supplier` lambda in the factory's constructor would need to create a new instance each time.

With the `MockProducerFactory`, there are two constructors; one to create a simple factory, and one to create factory that supports transactions.

Here are examples:

[source, java]
----
@Bean
ProducerFactory<String, String> nonTransFactory() {
    return new MockProducerFactory<>(() -> 
            new MockProducer<>(true, new StringSerializer(), new StringSerializer()));
}

@Bean
ProducerFactory<String, String> transFactory() {
    MockProducer<String, String> mockProducer = 
            new MockProducer<>(true, new StringSerializer(), new StringSerializer());
    mockProducer.initTransactions();
    return new MockProducerFactory<String, String>((tx, id) -> mockProducer, "defaultTxId");
}
----

Notice in the second case, the lambda is a `BiFunction<Boolean, String>` where the first parameter is true if the caller wants a transactional producer; the optional second parameter contains the transactional id.
This can be the default (as provided in the constructor), or can be overridden by the `KafkaTransactionManager` (or `KafkaTemplate` for local transactions), if so configured.
The transactional id is provided in case you wish to use a different `MockProducer` based on this value.

If you are using producers in a multi-threaded environment, the `BiFunction` should return multiple producers (perhaps thread-bound using a `ThreadLocal`).

IMPORTANT: Transactional `MockProducer`+++s+++ must be initialized for transactions by calling `initTransaction()`.

When using the `MockProducer`, if you do not want to close the producer after each send, then you can provide a custom `MockProducer` implementation that overrides the `close` method that does not call the `close` method from the super class.
This is convenient for testing, when verifying multiple publishing on the same producer without closing it.

Here is an example:

[source,java]
----
@Bean
MockProducer<String, String> mockProducer() {
    return new MockProducer<>(false, new StringSerializer(), new StringSerializer()) {
        @Override
        public void close() {

        }
    };
}

@Bean
ProducerFactory<String, String> mockProducerFactory(MockProducer<String, String> mockProducer) {
    return new MockProducerFactory<>(() -> mockProducer);
}

----
